package com.kvn.universal.filter.idempotent;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.caucho.hessian.io.Hessian2Input;
import com.caucho.hessian.io.Hessian2Output;
import com.caucho.hessian.io.SerializerFactory;
import com.google.common.base.Strings;
import com.kvn.universal.core.Invoker;
import com.kvn.universal.core.InvokerContext;
import com.kvn.universal.core.idempotent.IdempotentContext;
import com.kvn.universal.dao.IIdempotentDao;
import com.kvn.universal.domain.Idempotent;
import com.kvn.universal.exception.UniversalChainErrors;
import com.kvn.universal.filter.Callback;
import com.kvn.universal.filter.Filter;

/**
 * 幂等 Filter。<br/>
 * <b>存在成功的幂等记录时，则直接返回上次的调用结果。 此时，如果 Callback 不为空，则通过 Callback 自定义返回。</b>
 * 
 * @author wzy
 * @date 2017年7月14日 上午10:15:16
 */
public class IdempotentFilter implements Filter {
	private final static Logger logger = LoggerFactory.getLogger(IdempotentFilter.class);

	private IIdempotentDao idempotentDao;

	private Callback callback;

	public IdempotentFilter(IIdempotentDao idempotentDao) {
		super();
		this.idempotentDao = idempotentDao;
	}

	public IdempotentFilter(IIdempotentDao idempotentDao, Callback callback) {
		super();
		this.idempotentDao = idempotentDao;
		this.callback = callback;
	}

	@Override
	public Object invokeWapper(Invoker invoker, InvokerContext context) {
		IdempotentHolder idenpotentHolder = context.getExtendContext(IdempotentContext.class).getCurrentIdempotent();
		logger.info("IdempotentFilter start... " + idenpotentHolder);
		Idempotent idempotent = getIdenpotent(idenpotentHolder);
		logger.info("IdempotentFilter - last time idempotent is : " + JSON.toJSONString(idempotent));
		// 查幂等记录是否存在，不存在就插入
		if (idempotent == null) {
			idempotent = buildIdempotent(idenpotentHolder);
			idempotentDao.add(idempotent); // 幂等的接口，在并发时，同时插入幂等记录时，只能让一个成功。（唯一约束来保证）
		} else if (Constants.STATUS_SUCCESS.equals(idempotent.getStatus())) {
			// 存在成功的幂等记录，就直接返回，不再执行其他 filter
			logger.info("IdempotentFilter - biz excute success last time! Return last result!");
			context.getExtendContext(IdempotentContext.class).addStepIdempotent(idempotent);
			return lastReturn(idempotent, context);
		} else if (Constants.STATUS_FAIL.equals(idempotent.getStatus())) { // 前一次失败，这一次继续重做
			logger.warn("IdempotentFilter - biz excute fail last time, continue executing origin origin method!");
			int count = idempotentDao.updateStatus(idenpotentHolder.getIdempotentId(), idenpotentHolder.getBusinessId(), idenpotentHolder.getStepId(),
					Constants.STATUS_PROCESSING, Constants.STATUS_FAIL, null, null, null, null);
			if (count != 1) {
				logger.error("IdempotentFilter - update status error, update count=" + count);
				throw UniversalChainErrors.OTHER_THREAD_PROCESSING.exp();
			}
		} else if (Constants.STATUS_PROCESSING.equals(idempotent.getStatus())) {
			logger.warn("IdempotentFilter - idempotent status is {}, other thread maybe executing!", idempotent.getStatus());
			throw UniversalChainErrors.OTHER_THREAD_PROCESSING.exp();
		} else {
			logger.warn("IdempotentFilter - unknown idempotent status : {}, ERROR!", idempotent.getStatus());
			throw UniversalChainErrors.IDENPOTENT_ERROR.exp();
		}

		try {
			Object rlt = invoker.invoke(context);
			serializeReturnInfo(rlt, idenpotentHolder);
			return rlt;
		} catch (Exception e) {
			logger.error("IdempotentFilter - biz execute error...", e);
			String errMsg = Strings.nullToEmpty(e.getMessage());
			String remark = "业务失败:" + (errMsg.length() > 120 ? errMsg.substring(0, 120) : errMsg);
			idempotentDao.updateStatus(idenpotentHolder.getIdempotentId(), idenpotentHolder.getBusinessId(), idenpotentHolder.getStepId(),
					Constants.STATUS_FAIL, Constants.STATUS_PROCESSING, null, null, null, remark);
			// throw UniversalChainErrors.IDENPOTENT_ERROR.exp(e);
			throw e;
		}
	}

	/**
	 * 默认使用JSON 和 HESSION 序列化幂等结果到 DB，反序列化时使用 HESSION
	 * @param rlt
	 * @param idenpotentHolder
	 */
	private void serializeReturnInfo(Object rlt, IdempotentHolder idenpotentHolder) {
		ByteArrayOutputStream os = new ByteArrayOutputStream();
		Hessian2Output h2o = new Hessian2Output(os);
		try {
			h2o.writeObject(rlt);
			h2o.flushBuffer();
		} catch (IOException e) {
			throw UniversalChainErrors.IDENPOTENT_ERROR.exp(e);
		}
		try {
			idempotentDao.updateStatus(idenpotentHolder.getIdempotentId(), idenpotentHolder.getBusinessId(), idenpotentHolder.getStepId(),
					Constants.STATUS_SUCCESS, Constants.STATUS_PROCESSING, JSON.toJSONString(rlt), os.toByteArray(), rlt.getClass().getName(), null);
		} finally {
			try {
				os.close();
			} catch (IOException e) {
				logger.error("IdempotentFilter - ByteArrayOutputStream close error", e);
			}
		}
		return;
	}

	/**
	 * 返回上次执行的结果
	 *
	 * @param idempotent
	 * @param context
	 * @return
	 */
	private Object lastReturn(Idempotent idempotent, InvokerContext context) {
		// 自定义返回结果。例如，可以通过查DB来返回
		if (callback != null) {
			return callback.call(context);
		}

		SerializationType lastSerializationType = idempotent.getReturnByte() != null ? SerializationType.HESSIAN : SerializationType.JSON;
		
		if (SerializationType.HESSIAN == lastSerializationType) {
			return lastReturnByHessianDeserialization(idempotent);
		}

		if (SerializationType.JSON == lastSerializationType) {
			return lastReturnByJSONDeserialization(idempotent);
		}


		throw new RuntimeException("unknown SerializationType:" + lastSerializationType.name());
	}

	private Object lastReturnByHessianDeserialization(Idempotent idempotent) {
		ByteArrayInputStream is = new ByteArrayInputStream(idempotent.getReturnByte());
		Hessian2Input h2i = new Hessian2Input(is);
		h2i.setSerializerFactory(new SerializerFactory());
		try {
			Class<?> clazz = Class.forName(idempotent.getReturnClass());
			return h2i.readObject(clazz);
		} catch (Exception e) {
			throw UniversalChainErrors.DESERIALIZATION_FAIL.exp(e, idempotent.getId());
		} finally {
			try {
				is.close();
			} catch (Exception e) {
				logger.error("ByteArrayInputStream close error!", e);
			}
		}
	}

	private Object lastReturnByJSONDeserialization(Idempotent idempotent) {
		try {
			Class<?> clazz = Class.forName(idempotent.getReturnClass());
			return JSON.parseObject(idempotent.getReturnInfo(), clazz);
		} catch (Exception e) {
			throw UniversalChainErrors.DESERIALIZATION_FAIL.exp(e, idempotent.getId());
		}
	}

	private Idempotent buildIdempotent(IdempotentHolder idenpotentHolder) {
		Idempotent idempotent = new Idempotent();
		idempotent.setIdempotentId(idenpotentHolder.getIdempotentId());
		idempotent.setBusinessId(idenpotentHolder.getBusinessId());
		idempotent.setStepId(idenpotentHolder.getStepId());
		idempotent.setStatus(Constants.STATUS_PROCESSING);
		return idempotent;
	}

	private Idempotent getIdenpotent(IdempotentHolder idenpotentHolder) {
		return idempotentDao.selectOne(idenpotentHolder.getIdempotentId(), idenpotentHolder.getBusinessId(), idenpotentHolder.getStepId());
	}

	/**
	 * 状态常量
	 * 
	 * @author wdh
	 *
	 */
	private static class Constants {
		public final static String STATUS_PROCESSING = "PROCESSING";
		public final static String STATUS_SUCCESS = "SUCCESS";
		public final static String STATUS_FAIL = "FAIL";
	}
}
